---
title: Server Services
sidebar_position: 4
---

import Admonition from '@theme/Admonition';

# Server Services

The Server's business logic is organized into modular service classes and functions.

## Service Organization

### Storage Services (`/services/storage/`)

| Service/Module | Purpose | Key Components |
|---------|---------|-------------|
| **BaseStorageService** | Abstract base class for storage operations | `smart_chunk_text()`, `extract_metadata()`, `batch_process_with_progress()` |
| **storage_services.py** | Contains DocumentStorageService class | `upload_document()`, `store_documents()`, `process_document()` |
| **document_storage_service.py** | Storage utility functions | `add_documents_to_supabase()`, `add_documents_to_supabase_parallel()` |
| **code_storage_service.py** | Code extraction utilities | `extract_code_blocks()`, `generate_code_example_summary()`, `add_code_examples_to_supabase()` |

<Admonition type="info" title="Storage Module Organization">
The storage module has both service classes and utility functions:
- **storage_services.py** contains the `DocumentStorageService` class that extends `BaseStorageService`
- **document_storage_service.py** and **code_storage_service.py** contain standalone utility functions
- This separation allows both object-oriented and functional approaches as needed
</Admonition>

### Search Services (`/services/search/`)

| Service/Module | Purpose | Key Components |
|---------|---------|-------------|
| **search_services.py** | Contains SearchService class | `perform_rag_query()`, `search_code_examples_service()`, `rerank_results()` |
| **vector_search_service.py** | Vector search utilities | `search_documents()`, `search_code_examples()`, `search_documents_async()` |

<Admonition type="info" title="Search Module Organization">
- **search_services.py** contains the `SearchService` class for high-level search operations
- **vector_search_service.py** contains low-level vector search utility functions
- The SearchService uses vector search utilities internally
</Admonition>

### RAG Services (`/services/rag/`)

| Service | Purpose | Key Methods |
|---------|---------|-------------|
| **CrawlingService** | Web crawling operations | `crawl_single_page()`, `crawl_batch_with_progress()`, `crawl_recursive_with_progress()` |

### Knowledge Services (`/services/knowledge/`)

**Recent Refactoring**: The Knowledge API was refactored from a 1,076-line monolith to a 550-line API layer (49% reduction) by extracting business logic into specialized services.

| Service | Purpose | Key Methods |
|---------|---------|-------------|
| **CrawlOrchestrationService** | Orchestrates entire crawl workflow | `orchestrate_crawl()`, `set_progress_id()`, `cancel()`, `is_cancelled()` |
| **KnowledgeItemService** | CRUD operations for knowledge items | `list_items()`, `update_item()`, `get_available_sources()` |
| **CodeExtractionService** | Code block extraction and processing | `extract_and_store_code_examples()`, `extract_code_blocks_from_html()` |
| **DatabaseMetricsService** | Database statistics and metrics | `get_metrics()` |

**Service Layer Benefits:**
- **Separation of Concerns**: Business logic separated from API routing
- **Testability**: Services can be unit tested without Socket.IO dependencies
- **Reusability**: Services used by both API endpoints and MCP tools
- **Progress Tracking**: Integrated with ProgressTracker utility

### Project Services (`/services/projects/`)

**Recent Refactoring**: The Projects API was refactored from a monolithic 1,690-line file to a clean service layer architecture, reducing the main API file to 868 lines (42% reduction) while extracting business logic into specialized services.

| Service | File | Purpose | Key Methods |
|---------|------|---------|-------------|
| **ProjectService** | `project_service.py` | Core project CRUD operations | `create_project()`, `get_project()`, `update_project()`, `delete_project()`, `list_projects()` |
| **TaskService** | `task_service.py` | Task lifecycle management | `create_task()`, `update_task()`, `archive_task()`, `list_tasks()` |
| **ProjectCreationService** | `project_creation_service.py` | AI-assisted project creation workflow | `create_project_with_ai()`, `generate_features()`, `process_github_repo()` |
| **SourceLinkingService** | `source_linking_service.py` | Project-knowledge source relationships | `update_project_sources()`, `format_projects_with_sources()` |
| **ProgressService** | `progress_service.py` | Real-time operation tracking via Socket.IO | `start_operation()`, `update_progress()`, `complete_operation()`, `error_operation()` |
| **DocumentService** | `document_service.py` | Project document management | `add_document()`, `get_documents()`, `update_document()` |
| **VersioningService** | `versioning_service.py` | JSONB field version control | `create_version()`, `get_versions()`, `restore_version()` |

**Service Layer Benefits:**
- **Separation of Concerns**: Business logic separated from API routing
- **Reusability**: Services can be used by multiple API endpoints and MCP tools  
- **Testability**: Each service can be unit tested independently
- **Socket.IO Integration**: Real-time updates handled at the service layer

**Example Service Coordination:**
```python
# projects_api.py - Thin API layer
@router.post("/projects")
async def create_project(request: CreateProjectRequest):
    progress_id = secrets.token_hex(16)
    
    # Start progress tracking
    progress_service.start_operation(progress_id, 'project_creation')
    
    # Background task coordinates multiple services
    asyncio.create_task(_create_project_with_ai(progress_id, request))
    
    return {"progress_id": progress_id}

# Background workflow using multiple services
async def _create_project_with_ai(progress_id: str, request: CreateProjectRequest):
    creation_service = ProjectCreationService()
    
    # AI-assisted creation with real-time updates
    success, result = await creation_service.create_project_with_ai(
        progress_id=progress_id,
        title=request.title,
        github_repo=request.github_repo
    )
    
    if success:
        # Broadcast update to Socket.IO clients
        await broadcast_project_update()
        await progress_service.complete_operation(progress_id, result)
```

### Core Services (`/services/`)

| Service | Purpose | Key Methods |
|---------|---------|-------------|
| **SourceManagementService** | Manage knowledge sources | `get_available_sources()`, `delete_source()`, `update_source_metadata()` |
| **CredentialService** | Secure credential storage | `get_credential()`, `set_credential()` |
| **PromptService** | AI prompt management | `get_prompt()`, `reload_prompts()` |
| **ThreadingService** | Threading & rate limiting | `batch_process()`, `rate_limited_operation()` |
| **MCPServiceClient** | HTTP client for MCP | `crawl_url()`, `search()`, `delete_source()` |
| **ClientManager** | Database client management | `get_supabase_client()` |
| **MCPSessionManager** | MCP session management | `create_session()`, `validate_session()`, `cleanup_expired()` |
| **CrawlerManager** | Global crawler instance management | `get_crawler()`, `initialize()`, `cleanup()` |
| **LLMProviderService** | Multi-provider LLM support | `get_llm_client()`, `get_llm_client_sync()`, `get_embedding_model()` |

### Modular Architecture Pattern

The services follow a clear architectural pattern:

- **Utility Modules**: Low-level functions for specific operations
  - `vector_search_service.py` - Vector database operations
  - `document_storage_service.py` - Document storage functions
  - `code_storage_service.py` - Code extraction functions
  
- **Service Classes**: High-level orchestration of utilities
  - `SearchService` in `search_services.py`
  - `DocumentStorageService` in `storage_services.py`
  - Knowledge services in `/services/knowledge/`
  - Project services in `/services/projects/`
  
- **Base Classes**: Abstract implementations for extensibility
  - `BaseStorageService` - Common storage patterns
  
- **Manager Classes**: Singleton instances for global resources
  - `CrawlerManager` - Global crawler instance
  - `MCPSessionManager` - Session management
  
- **Single Responsibility**: Each module has one focused purpose

### Embedding Services (`/services/embeddings/`)

| Module | Purpose | Key Functions |
|--------|---------|---------------|
| **embedding_service.py** | OpenAI embedding creation | `create_embeddings_batch()`, `create_embeddings_batch_async()` |
| **contextual_embedding_service.py** | Context-aware embeddings | `generate_contextual_embedding()`, `generate_contextual_embeddings_batch()` |

### Utility Classes (`/utils/progress/`)

| Utility | Purpose | Key Methods |
|---------|---------|-------------|
| **ProgressTracker** | Consolidated Socket.IO progress tracking | `start()`, `update()`, `complete()`, `error()`, `update_batch_progress()`, `update_crawl_stats()` |

<Admonition type="tip" title="Async/Sync Boundaries">
The embedding service provides both sync and async versions:
- Use `create_embeddings_batch_async()` in async contexts (recommended)
- Sync versions exist for backward compatibility but will return zero embeddings if called from async context
- Always prefer async versions for better performance and proper event loop handling
</Admonition>

## Service Usage Patterns

### Direct Service Usage (FastAPI)
```python
# Import from new locations
from ..services.source_management_service import SourceManagementService
from ..services.storage import DocumentStorageService
from ..services.search import SearchService
from ..services.knowledge import CrawlOrchestrationService, KnowledgeItemService
from ..services.crawler_manager import get_crawler

# Example: Knowledge item crawling
@router.post("/knowledge-items/crawl")
async def crawl_knowledge_item(request: KnowledgeItemRequest):
    crawler = await get_crawler()
    orchestration_service = CrawlOrchestrationService(crawler, get_supabase_client())
    result = await orchestration_service.orchestrate_crawl(request.dict())
    return result

# Example: Crawl cancellation
@router.post("/knowledge-items/stop/{progress_id}")
async def stop_crawl_task(progress_id: str):
    # Cancel orchestration service
    orchestration = get_active_orchestration(progress_id)
    if orchestration:
        orchestration.cancel()
    
    # Cancel asyncio task
    if progress_id in active_crawl_tasks:
        task = active_crawl_tasks[progress_id]
        if not task.done():
            task.cancel()
            try:
                await asyncio.wait_for(task, timeout=2.0)
            except (asyncio.CancelledError, asyncio.TimeoutError):
                pass
        del active_crawl_tasks[progress_id]
    
    # Emit Socket.IO events
    await sio.emit('crawl:stopped', {
        'progressId': progress_id,
        'status': 'cancelled',
        'message': 'Crawl cancelled by user'
    }, room=progress_id)
    
    return {'success': True}

# Example: Source deletion
@router.delete("/sources/{source_id}")
async def delete_source(source_id: str):
    service = SourceManagementService(get_supabase_client())
    success, result = service.delete_source(source_id)
    return {"success": success, **result}
```

### HTTP Service Usage (MCP)
```python
async def delete_source(ctx: Context, source: str) -> str:
    async with httpx.AsyncClient() as client:
        response = await client.delete(f"{API_URL}/api/sources/{source}")
        return response.json()
```

### Simplified Socket.IO Emission Pattern (2025)
```python
from ..socketio_app import get_socketio_instance

# Get Socket.IO instance
sio = get_socketio_instance()

async def crawl_with_progress(url: str, progress_id: str):
    crawling_service = CrawlingService(crawler, supabase_client)
    
    # Simple direct emission - no complex utils
    await sio.emit('progress_update', {
        'status': 'crawling',
        'percentage': 10,
        'log': f'Starting crawl of {url}'
    }, room=progress_id)
    
    results = await crawling_service.crawl_batch_with_progress(urls=[url])
    return results
```

### ProgressTracker Utility Pattern
```python
from ..utils.progress import ProgressTracker

# Create tracker for an operation
tracker = ProgressTracker(sio, progress_id, 'crawl')

# Start tracking
await tracker.start({
    'currentUrl': url,
    'totalPages': 0,
    'processedPages': 0
})

# Update progress
await tracker.update('crawling', 25, 'Processing page 1 of 4')
await tracker.update_crawl_stats(1, 4, 'https://example.com/page1')

# Complete or error
await tracker.complete({
    'chunksStored': 150,
    'wordCount': 5000
})
# OR
await tracker.error('Failed to crawl: Network timeout')
```

## Key Service Features

### Simplified Real-Time Communication (2025 Pattern)

**Architecture Changes:**
- **No database polling** - Eliminated 2-second polling system that checked for task/project changes
- **Simple @sio.event handlers** - Uses official Socket.IO 2025 documentation pattern
- **No namespace classes** - Removed complex `TasksNamespace` and `ProjectNamespace` classes
- **Root namespace only** - Everything runs on `/` namespace for simplicity
- **Direct room management** - Simple `sio.enter_room()` and `sio.emit()` calls

**Simplified Socket.IO Integration:**
```python
# Services import and use Socket.IO directly
from ..socketio_app import get_socketio_instance

sio = get_socketio_instance()

# Simple emission to rooms - no namespace complexity
await sio.emit('task_updated', task_data, room=project_id)
await sio.emit('progress_update', {'status': 'processing', 'percentage': 50}, room=progress_id)

# Event handlers use simple @sio.event decorators
@sio.event
async def join_project(sid, data):
    await sio.enter_room(sid, data['project_id'])
    tasks = get_tasks_for_project(data['project_id'])
    await sio.emit('initial_tasks', tasks, to=sid)
```

### Threading Service
- Rate limiting for OpenAI API (200k tokens/min)
- Thread pools for CPU and I/O operations
- Socket.IO-safe batch processing
- System metrics monitoring

### Credential Service
- Encrypted credential storage
- Category-based organization
- Runtime caching for performance
- Environment variable compatibility

### Document Storage
- Parallel document processing
- Progress reporting via callbacks
- Automatic embedding generation
- Batch operations for efficiency

### Crawler Manager
- Singleton pattern for global crawler instance
- Automatic Docker environment detection
- Proper initialization and cleanup
- Prevents circular imports

### LLM Provider Service
- Supports multiple providers: OpenAI, Google Gemini, Ollama
- OpenAI-compatible interface for all providers
- Automatic provider detection from environment
- Both async and sync client creation

## Service Dependencies

**Refactored Architecture (2025):**

```
FastAPI Endpoints (projects_api.py)
    ↓
Socket.IO Handlers (socketio_handlers.py)
    ↓
Service Classes (ProjectService, TaskService, etc.)
    ↓
Service Functions (add_documents_to_supabase, etc.)
    ↓
Database (Supabase) / External APIs (OpenAI)
```

**Key Architectural Changes:**
- **Extracted Socket.IO Handlers**: Moved from embedded namespace classes to dedicated `socketio_handlers.py` file (~225 lines)
- **Service Layer Separation**: Business logic now in separate service classes instead of embedded in API routes
- **Simplified Socket.IO**: Eliminated complex namespace hierarchy in favor of simple `@sio.event` decorators
- **Progress Tracking**: Centralized in `ProgressService` with real-time Socket.IO broadcasting

## Architecture Benefits

### Improved Code Organization
- **Clear Separation**: Utilities vs. service classes are clearly separated
- **Single Responsibility**: Each module has one focused purpose
- **Reduced Duplication**: Common functionality in base classes
- **Better Maintainability**: Easy to find and update specific functionality
- **Service Layers**: Business logic separated from API routing (knowledge, projects)

### Enhanced Extensibility
- **Base Classes**: New storage services can extend `BaseStorageService`
- **Modular Design**: Easy to add new search algorithms or storage backends
- **Service Composition**: Services can be composed for complex operations
- **Provider Flexibility**: Easy to add new LLM providers via LLMProviderService

### Testing & Development
- **Isolated Testing**: Each service can be unit tested independently
- **Mock Injection**: Services accept clients via dependency injection
- **Clear Interfaces**: Well-defined service contracts
- **Progress Tracking**: Centralized progress management via ProgressTracker

## Service Directory Summary

Complete listing of all service modules organized by functionality:

```
services/
├── storage/                    # Document and code storage
│   ├── base_storage_service.py # Abstract base class
│   ├── storage_services.py     # DocumentStorageService class
│   ├── document_storage_service.py # Storage utilities
│   └── code_storage_service.py # Code extraction utilities
├── search/                     # Search and retrieval
│   ├── search_services.py      # SearchService class
│   └── vector_search_service.py # Vector search utilities
├── knowledge/                  # Knowledge management (refactored)
│   ├── crawl_orchestration_service.py
│   ├── knowledge_item_service.py
│   ├── code_extraction_service.py
│   └── database_metrics_service.py
├── projects/                   # Project management (refactored)
│   ├── project_service.py
│   ├── task_service.py
│   ├── project_creation_service.py
│   ├── source_linking_service.py
│   ├── progress_service.py
│   ├── document_service.py
│   └── versioning_service.py
├── rag/                        # RAG operations
│   └── crawling_service.py     # Web crawling
├── embeddings/                 # Embedding generation
│   ├── embedding_service.py
│   └── contextual_embedding_service.py
└── (core services)             # Infrastructure services
    ├── client_manager.py       # Database client
    ├── crawler_manager.py      # Crawler instance
    ├── credential_service.py   # Credentials
    ├── prompt_service.py       # Prompts
    ├── threading_service.py    # Threading/rate limiting
    ├── mcp_service_client.py   # MCP HTTP client
    ├── mcp_session_manager.py  # MCP sessions
    ├── source_management_service.py # Sources
    └── llm_provider_service.py # LLM providers
```

## Configuration

Services are configured via environment variables:
- `SUPABASE_URL` - Database URL
- `SUPABASE_SERVICE_KEY` - Database service key  
- `OPENAI_API_KEY` - For embeddings (or other provider keys)
- `LLM_PROVIDER` - LLM provider selection (openai, google, ollama)
- `CODE_BLOCK_MIN_LENGTH` - Minimum code block length in characters (default: 1000)
- `USE_CONTEXTUAL_EMBEDDINGS` - Enable contextual embeddings
- `USE_HYBRID_SEARCH` - Enable hybrid search mode
- `USE_RERANKING` - Enable search result reranking

All services use dependency injection for testability and flexibility.

## Troubleshooting

### Code Extraction Issues

If code blocks are not being extracted properly:

1. **Check the minimum length**: The default `CODE_BLOCK_MIN_LENGTH` is 1000 characters. This ensures only substantial code blocks are extracted, not small snippets.
   
2. **Verify markdown format**: Code blocks must be wrapped in triple backticks (```) to be recognized.

3. **HTML fallback**: If markdown doesn't contain code blocks, the system will try to extract from HTML using `<pre>` and `<code>` tags.

### Crawling Performance

The crawling service uses simple configurations for reliability:

- No complex JavaScript waiting by default
- No forced element clicking
- Straightforward markdown extraction

If you need advanced crawling features for specific sites:

1. Consider implementing site-specific handlers
2. Use the `js_code` parameter only when necessary
3. Avoid `wait_for` selectors unless you're certain the elements exist

### Progress Tracking

Progress updates during crawling:

- **0-45%**: Crawling pages
- **45-50%**: Processing crawl results  
- **50-75%**: Storing documents with real-time batch updates
- **75-90%**: Extracting and storing code examples
- **90-100%**: Finalizing and cleanup

If progress appears stuck, check the logs for batch processing updates.